package com.study.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming01_WordCount {
  def main(args: Array[String]): Unit = {
    //TODO 创建环境对象
    //StreamingContext创建时需要两个参数：环境配置和批量处理的周期（采集周期）
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc =new StreamingContext(sparkConf,Seconds(3))
    //TODO 逻辑处理
    //获取端口数据
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordToOne = words.map((_,1))
    val wordToCount = wordToOne.reduceByKey(_ + _)
    wordToCount.print()
    //由于sparkStreaming采集器是长期执行的任务，所以不能直接关闭
    //如果main方法执行完毕，应用程序也会自动结束。所以不能让main方法执行完
    //1.启动采集器
    ssc.start()
    //2.等待采集器关闭
    ssc.awaitTermination()
  }
}
